package com.fintech.pangu.actuate.metrics.rocketmq;

import com.fintech.pangu.rocketmq.metrics.AbstractRocketMQMetricsCollector;
import com.fintech.pangu.rocketmq.metrics.RocketMQConsumeMessageContext;
import com.fintech.pangu.rocketmq.metrics.RocketMQSendMessageContext;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Micrometer实现的RocketMQMetricsCollector收集器
 */
public class MicrometerRocketMQMetricsCollector extends AbstractRocketMQMetricsCollector {

    private RocketMQSendMetrics rocketMQSendMetrics;
    private RocketMQConsumeMetrics rocketMQConsumeMetrics;

    public MicrometerRocketMQMetricsCollector(RocketMQSendMetrics rocketMQSendMetrics, RocketMQConsumeMetrics rocketMQConsumeMetrics){
        this.rocketMQSendMetrics = rocketMQSendMetrics;
        this.rocketMQConsumeMetrics = rocketMQConsumeMetrics;
    }



    //============= 发送标记 =============//
    /**
     * 标记Client发送成功
     * @param context
     */
    @Override
    protected void markSendMessageSuccess(RocketMQSendMessageContext context) {
        //总数
        rocketMQSendMetrics.getSendCountCounter(context);
        //统计未发送的条数
        AtomicInteger sendDelayGauge = rocketMQSendMetrics.getSendDelayGauge(context);
        sendDelayGauge.incrementAndGet();
        //消耗时间统计
        if(context.getDuration() > 0L){
            rocketMQSendMetrics.getSendCostTime(context);
        }
    }

    /**
     * 标记Client发送失败
     * @param context
     */
    @Override
    protected void markSendMessageFailed(RocketMQSendMessageContext context) {
        rocketMQSendMetrics.getSendCountCounter(context);
    }

    /**
     * 标记内部发送成功
     * @param context
     */
    @Override
    protected void markSendKernelMessageSuccess(RocketMQSendMessageContext context) {
        //内部发送总条数
        rocketMQSendMetrics.getSendKernelCounter(context);
        //统计未发送的条数
        rocketMQSendMetrics.getSendDelayGauge(context).decrementAndGet();
    }

    /**
     * 标记内部发送失败
     * @param context
     */
    @Override
    protected void markSendKernelMessageFailure(RocketMQSendMessageContext context) {
        rocketMQSendMetrics.getSendKernelCounter(context);
    }

    //============= 消费标记 =============//

    /**
     * 标记消费端消费成功
     */
    @Override
    protected void markConsumeMessageSuccess(RocketMQConsumeMessageContext context) {

        rocketMQConsumeMetrics.getConsumeCountCounter(context);
        //统计未消费的条数
        rocketMQConsumeMetrics.getConsumeDelayGauge(context).decrementAndGet();

        if(context.getDuration() > 0L){
            rocketMQConsumeMetrics.getConsumeCostTime(context);
        }
    }

    /**
     * 标记消费端消费失败
     */
    @Override
    protected void markConsumeMessageFailure(RocketMQConsumeMessageContext context) {
        rocketMQConsumeMetrics.getConsumeCountCounter(context);
    }

    /**
     * 标记消费端内部消费成功
     */
    @Override
    protected void markConsumeKernelMessageSuccess(RocketMQConsumeMessageContext context) {
        rocketMQConsumeMetrics.getConsumeKernelCounter(context);
        //统计未消费的条数
        rocketMQConsumeMetrics.getConsumeDelayGauge(context).incrementAndGet();
    }

    /**
     * 标记消费端内部消费失败
     */
    @Override
    protected void markConsumeKernelMessageFailure(RocketMQConsumeMessageContext context) {
        rocketMQConsumeMetrics.getConsumeKernelCounter(context);
    }

}
